Automatic Music Notation

Translate your song into staff notation
A Project By Haoyang Li (hl2425) and Yue Zhao (yz2697)


Demonstration Video


Introduction

We built an automatic music notation tool. It consists of two running processes on a Raspberry Pi, a main program that records the audio and generates notations and a server that presents the final results. One just need to sing into the microphone attached to the Raspberry Pi and tap the beats on the touchscreen, a corresponding staff notation will be generated after the end button is pressed. The recorded beats, audio and corresponding staff notations are accessible through a web page hosted by the server. It also allows one to pause the recording to take a break and resume it afterwards. Its accuracy is bounded by the quality of recording, the quality of beats and the faults of pitch detection algorithms, but overall, it works.


A picture of our project
Figure 1. Our project

Project Objective

  • Get a microphone working
  • Write an audio recorder on the Raspberry Pi
  • Figure out how to slice the recorded audio into short pieces properly
  • Detect a single pitch out of each short piece
  • Study the basics of musical notations
  • Decide the clef for notation and the durations of pitches
  • Transform clef, durations and pitches into staff notation
  • Write a web page to organize the recordings and notations into a readable style
  • Build a server to present the web page when the notations are generated
  • Have them running together without falling aparts

Design

Figure 2. The overall design of the main program.

The overall design of the main program is shown in Figure 2. The program consists of two status: an idle status when it only polls for the start button and the quit button, and a started status when it polls all four buttons as well as the touchscreen for the taps. When the program is initiated, it enters into the idle status until the start button is pressed. Then it starts the recording and prepares some data structures to remember states and events and gets the program into the started status. In the started status, the recording always runs in the background while the program polls for events, including pause, resume (the start button is overloaded as the resume button in started status) and beat. To end the recording and get the results, you have to press an end button, which will end the recording and calling other functions to process the audio, detect the pitches and generate the notations. It will also notify the server so we can easily see the results through a web page. Whenever the quit button is pressed, the entire program exits.

The server is quite simple, it hosts a web page, where there is a ping request from the web page pinging the server periodicly, and the server will read exc.json to check for update. Once the main program finishes a piece, it will notify the server by setting "udpate" in exc.json as 2, then the next time the server recieves the ping, it checks the json file and notice that there is an update, so it will clear the update and respond the ping with the update. The web page will then request for the results (i.e. the WAV file, the SVG notation and the SVG wave visualization) and the server will serve those results for it.


Recording

An explanation for the mechanism of recording
Figure 3. An example of a recording process

As illustrated in Figure 2, once you pressed the start button, the recording will start (and it will time out when the maximum time is reached, which is 10 minutes here) in the background. Each tap on the music symbol displayed on the touchscreen will be recorded as a beat, shown in dotted red vertical line. You can pause the recording when necessary and resume it when you are ready, but these actions will only be recorded as an event and the recording is always on until you hit the end button. The beats are adjusted according to the pauses on the fly, but the audio will be adjusted when the recording is finished. An example of the recorded events look like: [('start', 0), ('pause', 2.44), ('resume', 4.84), ('pause', 7.13), ('resume', 8.94), ('end', 10.71)] , the first entry of each tuple is the event and the second entry is the relative time in seconds when these events happen. The end button will also trigger the processing of the audio, where the very first step is to cut the large, long audio into small pieces according to the recorded events and beats, merge these pieces into the final audio, and save it as a WAV file. Then the pitch detection and music notation functions will be called subsequently. To increase the accuracy of pitch detection, the start time in the event sequence will be adjusted according to the first beat.


Pitch Detection

Although a large number of different approaches such as AMDF (Average Magnitude Difference Function) and ASMDF (Average Squared Mean Difference Function) have been conducted to detect pitches, the autocorrection pitch detection algoritm is proven over time as the one of the most successful algorithms, partly due to its reliability and robustness. As the algorithm only involves computations directly on the waveform, it can greatly benefit from the digital hardware implemented in Raspberry Pi with only a simple multiplier and a basic ALU (Arithmetric Logic Unit). Also, the method is almost phase insensitive and easy to use to detect the pitch frequency.

The theory of autocorrelation algorithm is not complex. Given a discrete time signal , the autocorrelation function is defined as below:

In terms of pitch detection, if we confine the waveform to one single pitch, would be periodic with period , meaning for every n, . Furthmore, it is not hard to prove that the autocorrelation function also become periodic with exact the same period

Hence, by sampling the sound wave via a sample rate (typical 44100Hz or 22050Hz) , calculating the autocorrelation function over sampled signal , we can readily acquire the period by recording the distance between two adjacent peaks. As long as figuring out the period of the waveform, deriving the corresponding frequency tends to be trivial, as shown below:

Then, with a fixed and a specific tuning choice, frequency to pitch is a very nice one-to-one mapping:

Note Frequency (Hz) Wavelength (cm)
C3 130.81 263.74
D3 146.83 234.96
E3 164.81 209.33
F3 174.61 197.58
G3 196.00 176.02
A3 220.00 156.82
B3 246.94 139.71
C4 261.63 131.87
D4 293.66 117.48
E4 329.63 104.66
F4 349.23 98.79
G4 392.00 88.01
A4 440.00 78.41
B4 493.88 69.85
C5 523.25 65.93
Table 1. An instance that maps pitch notes to the corresponding frequencies

Table 1 above is an instance mapping pitch note in typical Alto Clef to the corresponding frequency. We believe if the frequency we detected is closest to a certain frequency in the map, the corresponding pitch related to that frequency would be the exact the pitch we detect.


Music Notation

Figure 4. The three clefs it supports and the pitch ranges of them, cited from Clef - wikipedia.

In western musical notation, there are four key concepts: clef, time signature, pitch name and pitch duration. Apparently these four are not sufficient in a real music notation, at least we would also need to consider the scale. But for simplicity, we will only consider these four key concepts. Furthermore, the time signature is rather a subjective parameter, i.e. the same music notation can be re-written into different time signatures. For this demonstrative project, we would only consider the 4/4 time signature. We treat each beat as the same length as a quarter note, which is an adjustable parameter. We only note down notes of pitches between C2 and C6. It covers the most common range of frequencies. Whichever clef that most beautifully presents the notes is chosen.

An example of the outputs of the pitch detection module looks like:

# Clef
clef = "alto"
# Pitch names
pitches = ['F♯3', 'G5', 'D♯4', 'F4', 'D♯4', 'F♯4', 'F4', 'E4', 'A3', 'A♯3', 'E4', 'G4', 'G♯5', 'A♯4']
# Durations, 1/4 means a quarter length
durations = [0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25, 0.25]

Each duration is matched to a pitch name and used to construct a music21.stream.Stream object, it will then be converted into a Lilypond source and compiled using Lilypond, which directly exports a SVG graph of the corresponding musical notation.


Testing & Results

1. Counting Numbers with Pauses

In this experiment, a person counts numbers in Chinese (a language known for its four music-like tones) and speaks into the microphone using this system. The recording is paused and resumed twice with numbers counting continuously.

Figure 5. The audio waves and beats of the counting numbers recording.
Figure 6. The generated notations of the counting numbers recording.

As you can see, although we cannot define the correct pitch for the counting, but it does show a plausible trending. You can notice that the length of the notation is shorter since it only considers the non-paused pieces. You can hear it here:

You can sense the discontinuity if you listen carefully, this is caused by the processing according to the pause and resume events recorded, as discussed in Recording.

2. Piano Finger Practice

In this experiment, a piano finger practice video from the Youtube (here) is played while a person taps the beats along with it. The staff notation is best suited for the piano, and the wavy finger practice should give us a sense whether the notations are correct or not.

Figure 7. The audio waves and beats of the piano finger practice recording.
Figure 8. The generated notations of the piano finger practice recording.

The wavy pattern of the finger practice is quite clear on the genrated notation. But you may also notice that there are some large shifts clearly wrong. These are good examples of octave error inherent to the algorithm we used for pitch detection, as discussed in Pitch Detection. You can hear the recording here:

Try to sense the wavy pattern yourself :)

3. Singing Amazing Grace

In this experiment, a person sings the first verse of the Amazing Grace song into the microphone while tapping the beats on its own pace. This recording is significantly longer, and singing to one's own pace should make the beats more accurate.

Figure 9. The audio waves and beats of the amazing grace singing recording.
Figure 10. The generated notations of the amazing grace singing recording.

As you can see, it does look like a music notation for a song and it correctly decides the alto clef (suitable for human voice), although its accuracy is still not that great. You can hear it here:

Well, it is functional enough to perform automatic music notation, but clearly it needs to work on the accuracy.


Future

  • Although the autocorrelation algorithm works pretty well in almost scenarios, it occasionally suffers from octave-shifting problems. A plausible remedy for this may involve a new algorithm published by Balachandra Kumaraswamy in 2019 named Octave Error Reduction in Pitch Detection Algorithms Using Fourier Series Approximation Method
  • Instead of monophony, a more typical scenario is people are likely to play music in an orchestra or a band, resulting in a homophonic performance. In this case, besides one melody may predominate the rhythm, one or more additional strands may flesh out the harmony. Unfortunately, the current algorithm only works well for the monophonic case. For homophony, a feasible approach is to first detect the predominant pitches based on the magnitude of the sound wave. Then we subtract the predominant waveform from the original wave and detect the secondary predominant using the same approach. Repeat the procedure until detecting all the melodies.
  • Currently, our detector is only sensitive to standard syllables from classic instruments like the piano. Further, we'd like to apply the detector for the human voice so that a song with different sensuality and lyrics can output fixed staff notations. Actually, researchers also have the same thoughts as us, and a nice paper Talkin: A Robust Algorithm for Pitch Tracking (RAPT) from David Talkin may help.
  • One of the initial thoughts of the project is to build a nice voice detector for improvision so that musicians do not have to waste a lot of time composing the music sheet. Hence, real-time detection may be a cool thing to step forward. The idea is instead of saving the whole sound from the microphone and output them after recording, the detector would output the staff notations right after dectecting the pitch and clef. A naive but workable approach is to save the sound track every second and appy the detection algorithm to every sound track. As the detection algorithm returns, outputting the notations to the screen.
  • To make the project more intact, adding a reverse procedure that inputs staff notations and outputs the music may be a nice future plan. Since what we want to do is basic converting an image to a bunch of pitches, conducting machine learning algorithm typically Convolutional Neural Network (CNN) would be a good idea. CNN is really good at classifying images due to its nice shifting invariance. We can train the network on a powerful machine and deploy the trained model onto the Raspberry Pi.

Work Distribution

A group picture

Project group picture

Generic placeholder image

Haoyang Li

hl2425@cornell.edu

Conceptualization & system design

Controling functions (GPIO)

Recording & notation functions

Server/web page design & implementation

Generic placeholder image

Yue Zhao

yz2697@cornell.edu

Conceptualization & system design

Controling functions (Touchscreen)

Pitch detection functions

Algorithm design & evaluations


Parts List

Total: $40.95


References


Code Appendix

The entire code is open sourced in automatic_music_notation.

# main.py
import time
import enum

from controller import start_pressed, pause_pressed, end_pressed, quit_pressed, beat_pressed
from exchange import leave_message
from logger import log_info, log_warning, log_debug
from notation import compose_pitches, write_music, export_music
from pitch_detection import pitch_detection, write_pitch
from pitch_detection import pitch_duration_detection, clef_detection,wave_plot
from recording import record_long_piece, extract_pieces, write_piece, _SAMPLE_RATE

# We use status flags to indicate the states of the recording
class STATUS(enum.Enum):
    START_PRESSED = 1
    PAUSE_PRESSED = 2
    END_PRESSED = 3

# The directory that holds the results
_RESUlT_PATH = "./results"
def name2path(filename: str) -> str:
    return "%s/%s"%(_RESUlT_PATH, filename)

# Assuming one beat is a quarter length
beat_base = 4
# This is the maximal length of a recording
long_seconds = 600
# This is the piece counter
melody_count = 0
# This is the sample rate
sample_rate = _SAMPLE_RATE
# This is the status of the recording program
status = STATUS.END_PRESSED
log_info("Start the program, sample_rate: %d /s, max recording: %d s"%(sample_rate, long_seconds))
while True:
    # Idle mode, it can either start or quit
    if quit_pressed():
        log_info("Quit the entire program")
        exit(0)
    if start_pressed():
        log_info("Start recording...")
        # Start a long recording process in the background
        start_time = time.time()
        last_time = start_time
        piece = record_long_piece(long_seconds)
        # Set the initial states
        paused = False
        paused_period = 0
        event_secs = [("start", 0)]
        beat_secs = []
        beats = []
        status = STATUS.START_PRESSED
    if status != STATUS.START_PRESSED:
        continue

    # Started mode, it can either pause, resume, or quit
    while status == STATUS.START_PRESSED:
        if quit_pressed():
            log_info("Quit the entire program")
            exit(0)

        if end_pressed():
            end_time = time.time()
            log_info("End the recording after %f seconds" %
                  (end_time - start_time))
            status = STATUS.END_PRESSED

        # Time out equals to an end_pressed()
        if time.time() - start_time >= long_seconds:
            end_time = time.time()
            log_info("End the recording due to time out (%f seconds)" %
                  (end_time - start_time))
            status = STATUS.END_PRESSED

        # The `paused` is used to debounce the buttons
        if pause_pressed() and not paused:
            pause_time = time.time()
            log_info("Pause the recording after %f seconds" %
                  (pause_time - start_time))
            event_secs.append(("pause", pause_time-start_time))
            paused = True

        if start_pressed() and paused:
            resume_time = time.time()
            log_info("Resume the recording after %f seconds" %
                  (resume_time - start_time))
            event_secs.append(("resume", resume_time-start_time))
            paused = False
            paused_period += resume_time - pause_time

        if beat_pressed() and not paused:
            beat_time = time.time()
            beat_secs.append(beat_time - start_time)
            beats.append(beat_time - start_time - paused_period)

    if status == STATUS.END_PRESSED:
        # Extract beat, and beat durations
        log_debug("Recorded beats: %s"%(str(beats)))
        beat_itv_length = len(beats) - 1
        if beat_itv_length < 1:
            log_warning("No beat recorded, using 0.5s as default")
            beat_itv_length = int((end_time - start_time)*2)
            beat_itvs = [0.5]*beat_itv_length
            beats.append(0.5)
        else:
            beat_itvs = []
            for i in range(beat_itv_length):
                beat_itvs.append(beats[i+1] - beats[i])

        # Align the events with beats
        event_secs[0] = ('start', beats[0] - beat_itvs[0]/2)
        event_secs.append(("end", end_time - start_time))
        # Cut and concatenate the recordings according to events
        final_piece = extract_pieces(piece, event_secs)
        log_debug("Length of the Final Piece: %d" %len(final_piece))
        # Save the recording
        write_piece(final_piece, name2path("melody_%d.wav" % (melody_count)))
        
        log_info("Start creating the music notation")

        # Extract the raw pitches
        try:
            pitches = pitch_detection(name2path("melody_%d.wav" % (melody_count)), beat_itvs, sample_rate)
            log_debug("Length of beat intervals %d" %beat_itv_length)
            log_debug("Length of pitches %d" %len(pitches))
            log_debug("Detected pitches: %s"%(str(pitches)))
            # Save the extracted pitches
            write_pitch(pitches, name2path("pitches_%d" % (melody_count)))
        except:
            log_warning("Pitch detection failed, likely the recorded audio is empty")
            continue
       
        # Post-processing the pitches according to the beat base
        pitches, durations = pitch_duration_detection(beat_base, pitches)
        log_debug("Durations: %s"%(str(durations)))

        # Determine the clef according to pitches
        clef = clef_detection(pitches)

        # Compose the pitches
        stream = compose_pitches(clef, pitches, durations)

        # Write the note stream and export it into a svg
        write_music(stream, name2path("melody_%d"%(melody_count)))
        export_music(stream, "melody_%d"%(melody_count))

        # Plot the last piece of wave
        wave_plot(piece, 0, sample_rate, beat_secs, end_time - start_time)

        leave_message("update", 2)
  
# controller.py
"""
Functions that connect the buttons and touchscreen to the main program
- Checking handles to decide whether a button is pressed or not
- Pygame codes that draw a music symbol
- Checking handle to decide whether the music symbol is touched or not
Control Panal Design:
- GPIO #17 (The First Button): Start/Continue Recording
- GPIO #22 (The Second Button): Pause Recording
- GPIO #23 (The Third Button): End Recording
- GPIO #27 (The Last Button): Quit
"""

import RPi.GPIO as GPIO
import pygame
import os

_START = 17
_PAUSE = 22
_END = 23
_QUIT = 27
_WIDTH = 320
_HEIGHT = 240
_IMAGE_PATH = "source/music_symbol.jpg"
_SIZE = 100
_RED = (255, 0, 0)
_WHITE = (255, 255, 255)

os.putenv('SDL_VIDEODRIVER', 'fbcon')  # Display on piTFT
os.putenv('SDL_FBDEV', '/dev/fb0')
os.putenv('SDL_MOUSEDRV', 'TSLIB')  # Track mouse clicks on piTFT
os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

pygame.init()
pygame.mouse.set_visible(False)

width, height = _WIDTH, _HEIGHT
size = (width, height)
screen = pygame.display.set_mode(size)
screen.fill(_WHITE)


def mouse_on_button(mouse, rect):
    x, y, width, height = rect.centerx, rect.centery, rect.w, rect.h
    return (x - width//2 < mouse[0] < x + width//2) and \
           (y - height//2 < mouse[1] < y + height//2)


def get_music_note_symbol(image_path, size, x, y):
    music_note_symbol = pygame.image.load(image_path)
    music_note_symbol = pygame.transform.scale(music_note_symbol, [size, size])
    music_note_symbol_rect = music_note_symbol.get_rect()
    music_note_symbol_rect = music_note_symbol_rect.move([int(x), int(y)])
    return music_note_symbol, music_note_symbol_rect


def draw_boundary(rect, color):
    x, y, width, height = rect.centerx, rect.centery, rect.w, rect.h
    vertex = [(x - width//2, y - height//2),
              (x - width//2, y + height//2),
              (x + width//2, y + height//2),
              (x + width//2, y - height//2)]
    boundary = pygame.draw.lines(screen, color, True, vertex, 3)
    return boundary

# Warning: draw a large number of lines, need to be optimize


def highlight(rect):
    draw_boundary(rect, _RED)
    pygame.display.flip()


def resume(rect):
    draw_boundary(rect, _WHITE)
    pygame.display.flip()


music_note_symbol, music_note_symbol_rect = get_music_note_symbol(
    _IMAGE_PATH, _SIZE, _WIDTH / 3, _HEIGHT / 3)
screen.blit(music_note_symbol, music_note_symbol_rect)
pygame.display.flip()

GPIO.setmode(GPIO.BCM)   # Set for broadcom numbering
GPIO.setup(_START, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(_PAUSE, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(_END, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(_QUIT, GPIO.IN, pull_up_down=GPIO.PUD_UP)


def start_pressed() -> bool:
    return not GPIO.input(_START)


def pause_pressed() -> bool:
    return not GPIO.input(_PAUSE)


def end_pressed() -> bool:
    return not GPIO.input(_END)


def quit_pressed() -> bool:
    return not GPIO.input(_QUIT)

# The funciton will be invoked in the whole process loop


def beat_pressed() -> bool:
    is_pressed = False
    for ev in pygame.event.get():

        mouse_pos = pygame.mouse.get_pos()
        if ev.type == pygame.QUIT:
            pygame.quit()
            exit()

        if mouse_on_button(mouse_pos, music_note_symbol_rect):
            if ev.type == pygame.MOUSEBUTTONDOWN:
                is_pressed = True
                highlight(music_note_symbol_rect)
                pygame.display.flip()
            if ev.type == pygame.MOUSEBUTTONUP:
                resume(music_note_symbol_rect)

    return is_pressed

# exchange.py
"""
A simple exchange package that uses a json file
to allow two proceses exchange information
"""

import json

_NAME = "./source/exc.json"

def leave_message(key:str, value:str) -> None:
    with open(_NAME, "r") as f:
        data = json.load(f)
    data[key] = value
    with open(_NAME, "w") as f:
        json.dump(data, f, indent=2)

def read_message(key:str)->None:
    with open(_NAME, "r") as f:
        data = json.load(f)
    return data[key]
/*exc.json*/
{
    "melody": "melody_0",
    "recording": "spectral",
    "audio": "melody_0",
    "update": -1
}
# logger.py
import logging
import logging.handlers

log = logging.getLogger('music_notation')
log.setLevel(logging.DEBUG)

logfile = "./logs/music.log"
handler = logging.handlers.RotatingFileHandler(
    logfile, maxBytes=4*1024, backupCount=3)
formatter = logging.Formatter('%(asctime)s-%(levelname)s:%(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)


def log_info(info: str) -> None:
    log.info(info)


def log_debug(info: str) -> None:
    log.debug(info)


def log_warning(info: str) -> None:
    log.warning(info)
# notation.py
"""
Functions related to generating notations:
- Generate a music21 stream of pitches
- Write these pitches into a lilypond source, a midi or a svg
"""

import music21

_CLEF = {
    "treble": music21.clef.TrebleClef(),
    "bass": music21.clef.BassClef(),
    "alto": music21.clef.AltoClef(),
    "tenor": music21.clef.TenorClef()
}


def pitchify(name: str, duration: float) -> music21.note.Note:
    if name == "R":
        return music21.note.Rest(length=duration)
    return music21.note.Note(name, quarterLength=duration*4)


def pitch_name_preprocess(name: str) -> str:
    name = name.replace("♯", "#")
    name = name.replace("♭", "-")
    return name


def compose_pitches(clef: str, pitches: list, durations: list, meter: str = '4/4') -> music21.stream.Stream:
    stream = music21.stream.Stream()
    stream.timeSignature = music21.meter.TimeSignature(meter)
    if clef in _CLEF:
        stream.clef = _CLEF[clef]
    for pitch, duration in zip(pitches, durations):
        if type(pitch) == list:
            pitch_list = []
            for p in pitch:
                p = pitch_name_preprocess(p)
                pitch_list.append(pitchify(p, duration))
            noted = music21.chord.Chord(pitch_list)
        else:
            pitch = pitch_name_preprocess(pitch)
            noted = pitchify(pitch, duration)
        stream.append(noted)
    return stream


def write_music(stream: music21.stream.Stream, filename: str) -> None:
    stream.write("lilypond", fp="%s.ly" % filename)
    stream.write("midi", fp="%s.mid" % filename)


def export_music(stream: music21.stream.Stream, filename: str) -> None:
    converter = music21.lily.translate.LilypondConverter()
    converter.loadFromMusic21Object(stream)
    converter.createSVG("%s" % (filename))


if __name__ == "__main__":
    pitches = [['C4', 'D4'], 'C4', 'C4', 'C4']
    # music21 uses 1 to mark quarter note...
    durations = [1, 1, 1, 1]
    stream = compose_pitches('treble', pitches, durations)
    #write_music(stream, "notation_test")
    export_music(stream, "./results/ntest")
# pitch_detection.py
"""
Functions related to pitch detection:
- Slice the audio into small pieces
- Detect the pitch of each piece
- Decide the durations and clefs
"""

import librosa
import matplotlib.pyplot as plt
import statsmodels.api as sm
from scipy.signal import find_peaks
from typing import List, Tuple

_NLAGS = 5000
_VIS_POINTS = 10000
_SPECTRAL_NAME = "spectral"

def single_pitch_detection(data: List[int], sample_rate: int) -> str:
    """
    Single Pitch Detection
    - data: Slicing data according to beats
    - sample_rate: self-explanatory
    It loads a file, get the music and the sample_rate
    May lead to warning: Librosa tries to use libsndfile first,
    and if that fails, it will fall back on the audioread package
    """
    pitch_note = 'R'
    # The range of the allowed frequencies
    bass_clef_freq_min = librosa.note_to_hz("C2")
    treble_clef_freq_max = librosa.note_to_hz("C6")

    auto_correlation_function = sm.tsa.acf(data, nlags=_NLAGS)
    # Find peaks of the autocorrelation
    peaks = find_peaks(auto_correlation_function)[0]
    if len(peaks) > 0:
        # Choose the first peak as our pitch component lag
        lag = peaks[0]
    else:
        return pitch_note

    # Transform lag into frequency
    pitch_freq = sample_rate / lag
    if pitch_freq <= treble_clef_freq_max and pitch_freq >= bass_clef_freq_min:
        pitch_note = librosa.hz_to_note(pitch_freq)
    return pitch_note


def pitch_detection(file_path: str, beat_itvs: list, sample_rate: int) -> list[str]:
    """
    Detect pitches
    - file_path: the path of the target audio file
    - beat_itvs: beat intervals used to slice the audio
    - sample_rate: the sample rate
    """
    # Data acquired from the sound track
    data, _ = librosa.load(file_path, sr=sample_rate)
    i = 0
    start_ptr = 0   # The start of the data slicing
    N = len(data)   # The length of the data
    pitches = []    # pitches detected

    beat_itv = sum(beat_itvs)/len(beat_itvs)
    while (start_ptr <= N):
        if i < len(beat_itvs):
            data_itv = int(beat_itvs[i] * sample_rate)
        else:
            data_itv = int(beat_itv * sample_rate)

        pitch = 'R'
        if (start_ptr + data_itv <= N):
            data_piece = data[start_ptr: start_ptr + data_itv]
            pitch = single_pitch_detection(
                data_piece, sample_rate
            )
        else:
            data_piece = data[start_ptr: N]
            pitch = single_pitch_detection(
                data_piece, sample_rate
            )

        pitches.append(pitch)
        start_ptr += data_itv
        i += 1

    return pitches


def write_pitch(pitches: List[str], file_path: str) -> None:
    with open(file_path, "w") as fp:
        fp.write(" ".join(pitches))


def pitch_duration_detection(note: int, pitches: List[str]) -> Tuple[list, list]:
    pitches_num = len(pitches)
    start_ptr = 0
    pitches_partition = []
    pitches_duration = []

    while start_ptr < pitches_num:
        pitches_partition.append(pitches[start_ptr])
        pitches_duration.append(1 / note)
        if (start_ptr + note <= pitches_num):
            for i in range(note - 1):
                if pitches[start_ptr + i + 1] == pitches_partition[-1]:
                    pitches_duration[-1] += 1 / note
                else:
                    pitches_partition.append(pitches[start_ptr + i + 1])
                    pitches_duration.append(1 / note)

        else:
            for i in range(pitches_num - start_ptr - 1):
                if pitches[start_ptr + i + 1] == pitches_partition[-1]:
                    pitches_duration[-1] += 1 / note
                else:
                    pitches_partition.append(pitches[start_ptr + i + 1])
                    pitches_duration.append(1 / note)

        start_ptr += note

    return pitches_partition, pitches_duration


def within_the_range(val, min, max) -> bool:
    return (val >= min and val <= max)


def clef_detection(pitches: list[str]) -> str:
    treble_clef_freq_min = librosa.note_to_hz("C4")
    treble_clef_freq_max = librosa.note_to_hz("C6")
    alto_clef_freq_min = librosa.note_to_hz("C3")
    alto_clef_freq_max = librosa.note_to_hz("C5")
    bass_clef_freq_min = librosa.note_to_hz("C2")
    bass_clef_freq_max = librosa.note_to_hz("C4")

    treble_cnt = 0
    alto_cnt = 0
    bass_cnt = 0

    clef = ""

    for pitch in pitches:
        if pitch == 'R':
            continue
        if within_the_range(librosa.note_to_hz(pitch), treble_clef_freq_min, treble_clef_freq_max):
            treble_cnt += 1
        if within_the_range(librosa.note_to_hz(pitch), alto_clef_freq_min, alto_clef_freq_max):
            alto_cnt += 1
        if within_the_range(librosa.note_to_hz(pitch), bass_clef_freq_min, bass_clef_freq_max):
            bass_cnt += 1

    max_cnt = max(treble_cnt, alto_cnt, bass_cnt)
    if max_cnt == treble_cnt:
        clef = "treble"
    elif max_cnt == alto_cnt:
        clef = "alto"
    else:
        clef = "bass"

    return clef


def wave_plot(data: int, start_time: float, sample_rate: int, beats: List[int], time_duration: int = 5) -> None:
    start_ptr = int(start_time * sample_rate)
    end_ptr = int((start_time + time_duration) * sample_rate)
    sample_itv_plot = int(sample_rate*time_duration//_VIS_POINTS)
    data_plot = data[start_ptr:end_ptr:sample_itv_plot]
    data_plot_len = len(data_plot)
    beats_plot = []
    j = len(beats) - 1
    while j >= 0:
        beat = beats[j]
        if beat >= start_time and beat <= start_time + time_duration:
            beats_plot.append(beat)
        if beat < start_time:
            break
        j -= 1

    time_plot = [(start_time + time * time_duration / data_plot_len)
                 for time in range(data_plot_len)]
    fig, ax = plt.subplots()
    ax.plot(time_plot, data_plot, 'k', linewidth=1)
    for beat in beats_plot:
        plt.axvline(beat, linestyle='--', color='r',
                    label='axvline - full height', linewidth=2)
    fig.set_figwidth(12)
    fig.set_figheight(3)
    plt.grid(False)
    plt.axis('off')
    fig.savefig("%s.svg"%(_SPECTRAL_NAME), format='svg')
    plt.close(fig=fig)


if __name__ == "__main__":
    file_path = "/home/pi/automatic_music_notation/sound_track/C2Long.wav"
    data, _ = librosa.load(file_path, sr=22050)
    wave_plot(data, 0, 22050, [1, 2, 3, 4])
    # pitch_note = single_pitch_detection(data, 22050)
    # print(pitch_note)
    # pitches = []
    # np.random.seed(1)
    # pitches_range = np.random.randint(65, 71, 16 * 2)
    # pitches_random = [chr(i) for i in pitches_range]
    # pitches_all_A = ['A'] * 32
    # pitches_test1 = ['A'] * 4 + ['B'] * 4 + ['A'] * 4 + ['C'] * 2 + ['A'] * 4 + ['D'] * 4
    # pitches_test2 = ['A'] * 6 + ['B']
    # pitches_partition, pitches_duration = pitch_duration_detection(16, pitches_test2)
    # clef = clef_detection(pitches_test2)
    # print("pitches: %s" %str(pitches))
    # print("pitches_partition: %s" %str(pitches_partition))
    # print("pitches_duration: %s" %str(pitches_duration))
    # print("clef: %s" %str(clef))
    # pitch_freqs = pitch_detection(file_path, [0.5], 22050)
    # print(pitch_freqs)
# recording.py
"""
Functions related to sound recording:
- Recording the voice from the USB-Microphones (one channel)
"""

import sounddevice as sd
import numpy as np
from scipy.io.wavfile import write

from logger import log_debug

_SAMPLE_RATE = 44100


def record_long_piece(seconds: float, fs: int = _SAMPLE_RATE) -> np.ndarray:
    piece = sd.rec(int(seconds * fs), samplerate=fs, channels=1)
    return piece


def write_piece(piece: np.ndarray, filename: str, fs: int = _SAMPLE_RATE) -> None:
    write(filename, fs, piece)


def extract_pieces(piece: np.ndarray, event_times: list, fs: int = _SAMPLE_RATE) -> np.ndarray:
    # Stop the recording
    sd.stop()
    log_debug("Recorded events: %s" % (str(event_times)))
    if event_times[0][0] != 'start':
        log_debug("Event logs are corrupted")
        exit(1)
    # Cut and concatenate the pieces according to the pause, resume events
    pieces = []
    prev_sec = event_times[0][1]
    paused = False
    for event, sec in event_times:
        if event == "pause":
            short_piece = piece[int(prev_sec*fs):int(sec*fs), :]
            log_debug("Get a piece between %f and %f" % (prev_sec, sec))
            pieces.append(short_piece)
            paused = True
        elif event == "resume":
            prev_sec = sec
            paused = False
        elif event == "end" and not paused:
            short_piece = piece[int(prev_sec*fs):int(sec*fs), :]
            log_debug("Get a piece between %f and %f" % (prev_sec, sec))
            pieces.append(short_piece)
    large_piece = np.vstack(pieces)
    return large_piece
# server.py
"""
A simple server designed to cast a "virtual printer"
- Serve the index page
- Serve the requested data
"""

import json
from flask import Flask, request, jsonify, send_file
from gevent.pywsgi import WSGIServer
from gevent import monkey

from exchange import read_message, leave_message

app = Flask(__name__)

@app.route('/')
def index():
    filename='index.html'
    return send_file(filename)

@app.route('/audio/')
def audio(path):
    filepath = "./results/%s"%(path)
    try: 
        with open(filepath, "rb") as f:
            data = f.read()
        return data
    except:
        return ''

@app.route('/data',methods=['GET'])
def dataServe():
    reqs=request.args.get('request')
    reqs = json.loads(reqs)
    if reqs['type'] == 'notation':
        melody = read_message("melody")
        if melody == "":
            return ""
        resp = None
        with open("./%s.svg"%(melody), "r") as f:
            resp = f.read()
        return resp
    elif reqs['type'] == 'recording':
        recording = read_message("recording")
        if recording == "":
            return ""
        resp = None
        with open("./%s.svg"%(recording), "r") as f:
            resp = f.read()
        return resp
    elif reqs['type'] == 'audio':
        recorded = read_message("audio")
        if recorded == "":
            return jsonify({"name":""})
        return jsonify({"name":"%s.wav"%(str(recorded))})
    elif reqs['type'] == 'ping':
        updated = read_message("update")
        if updated == 1:
            return jsonify({"update":1})
        elif updated == 2:
            leave_message("update", -1)
            return jsonify({"update":2})
        else:
            return jsonify({"update":-1})
    return ""

if __name__ == '__main__':
    monkey.patch_all()

    host: str = '10.49.53.185'
    port: int = 8880

    print('SERVING ON: ', 'http://' + host + ':' + str(port))

    http_server = WSGIServer((host, port), app)
    http_server.serve_forever()
<!-- index.html -->
<!DOCTYPE html>
<html>
    <head>
        <meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate" />
        <meta http-equiv="Pragma" content="no-cache" />
        <meta http-equiv="Expires" content="0" />
        <title>MUSIC NOTATION</title>
        <script type="text/javascript" src="https://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js"></script>
    </head>
    <body>
        <div style = "position:absolute; left:80px; top:10px">
          <h1>MUSIC NOTATION</h1>
          <p>© Haoyang Li & Yue Zhao</p>
        </div>
        <div style = "position:absolute; left:80px; top:110px">
          <h1>BEAT & RECORDING</h1>
          <div id="recording"></div>
        </div>
        <div style = "position: absolute; left: 420px; top:125px">
            <audio id = "recorded" controls>
            </audio>
        </div>
        <div style = "position:absolute; left:80px; top:500px">
          <h1>NOTATION</h1>
          <div id="notation"></div>
        </div>
        <script>
            function requestData(reqs,messageHandler,errorHandler){
                var host='10.49.53.185';
                var port='8880';
                $.ajax({
                    type : "GET",
                    contentType: "application/json;charset=UTF-8",
                    url : "http://"+host+":"+port+"/data",
                    data : {"request":JSON.stringify(reqs)},
                    success :function(result){
                        messageHandler(result);
                    },
                    error : function(e){
                        errorHandler();
                    }
                });
            }

            function update_notation(m){
                document.getElementById("notation").innerHTML = m;
            }

            function update_recording(m){
                document.getElementById("recording").innerHTML = m;
            }

            function update_audio(m){
                document.getElementById("recorded").innerHTML = "<source src=\"/audio/"+m.name+"\">"
            }

            function request_notation(){
                request = {
                    "type":"notation"
                }
                requestData(request, update_notation, (e)=>{})
            }

            function request_recording(){
                request = {
                    "type":"recording"
                }
                requestData(request, update_recording, (e)=>{})
            }

            function request_audio(){
                request = {
                    "type":"audio"
                }
                requestData(request, update_audio, (e)=>{})
            }

            function ping_result(m){
                if (m.update == 2){
                    request_notation();
                    request_audio();
                    request_recording();
                }
            }

            function periodic_ping(){
                request = {
                    "type":"ping"
                }
                requestData(request,ping_result, (e)=>{})
            }

            // Ping the server periodically for updates
            setInterval(periodic_ping, 5000);
        </script>
    </body>
</html>